-
Notifications
You must be signed in to change notification settings - Fork 206
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add DynamoDB source plugin #3349
Conversation
Submit the PR to receive early feedbacks. Still working on testing, will submit the UTs soon. |
coordinator: | ||
dynamodb: | ||
table_name: "coordinator-demo" | ||
region: "us-west-2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make an issue to get the configuration for this coordinator moved to the data-prepper-config.yaml
to be completed in a future PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure. Will do once this code is merged. I will create one to refer to this.
It looks like we can either expose a getCoordinationStore from current LeaseBasedCoordinator or by default only create coordinationStore from the data-prepper-config.yaml only instead of the coordinator. Not sure what is the best way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will want to use the data-prepper-config only. We can use the same flow that we currently use to provide plugins with a source coordinaor
LOG.info("Gracefully shutdown DynamoDB schedulers"); | ||
executor.shutdownNow(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing against calling shutdownNow
, but I don't think it's categorized as graceful shutdown since it forces the thread to quit immediately
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right, I think this is confusing, actually, the scheduler will be interrupped immediately (which is OK), and the scheduler will implement logic to shutdown the real jobs (such as stream consumer) gracefully. I will remove this.
@Override | ||
public void stop() { | ||
LOG.info("Stop DynamoDB Source"); | ||
dynamoDBService.shutdown(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should wrap this call in an if (Objects.nonNull(dynamoDBService)
check. Not a big deal but NullPointer would happen if this gets run before line 66
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will update.
* <li>Support multiple leases at the same time</li> | ||
* </ul> | ||
*/ | ||
public interface Coordinator { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could call this EnhancedSourceCoordinator
// Do nothing | ||
// The consumer must already do one last checkpointing and then release the lease. | ||
LOG.debug("Shard consumer completed with exception"); | ||
LOG.error(ex.toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's give more context to this error with a message saying where the exception has occurred
|
||
public class StreamConfig { | ||
|
||
public static final String BEGINNING = "BEGINNING"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be "enum"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update to use Enum.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed this back to String due to an exception that was raised in the new jackson lib (it was running OK previously) : Caused by: com.fasterxml.jackson.databind.JsonMappingException: Cannot construct instance of java.lang.Enum
(no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information.
Will try to fix this in later PRs.
.../main/java/org/opensearch/dataprepper/plugins/source/dynamodb/processor/RecordProcessor.java
Outdated
Show resolved
Hide resolved
|
||
|
||
@JsonProperty("coordinator") | ||
private PluginModel coordinationStoreConfig; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should be injecting coordination configurations here. This duplicates configurations and creates additional confusion for customers.
Perhaps the ideal is to require that customers configure the necessary coordinator in data-prepper-config.yaml?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will be a separate PR (https://github.com/opensearch-project/data-prepper/pull/3349/files#r1330297254)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realized the build is failing due to unit tests in the dynamo source. Once these are resolved I am good to approve. Thanks!
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
Signed-off-by: Aiden Dai <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@daixba I do not see any code for e2e acknowledgement support? Will that be added as a separate PR?
|
||
@Override | ||
public void start(Buffer<Record<Event>> buffer) { | ||
LOG.info("Start Processing"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add something to this log line to help clarify what it starting. Perhaps "Start processing DynamoDB streams for {table} on {bucket}" or something similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update to just 'Start DynamoDB service' for what it's actually doing, more log details will be in the service side.
...-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java
Outdated
Show resolved
Hide resolved
return new Record<>(event); | ||
} | ||
|
||
public Record<Event> convertToEvent(Map<String, Object> data) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's generally better design to split concerns such as writing to the buffer and converting record types. Perhaps this can be a follow-on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was called RecordProcessor which purpose is to convert the raw records to Jackson event and then write to buffer. Now it's renamed to RecordConverter but I didn't change the scope of this class.
...c/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileScheduler.java
Outdated
Show resolved
Hide resolved
/** | ||
* Currently, this is to stop all consumers. | ||
*/ | ||
public static void stopAll() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is static, it will stop all consumers in all pipelines. Use an instance per pipeline instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this has impacts to other pipelines, if they are running in separate containers or different nodes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Data Prepper can run multiple parallel pipelines. If you have two pipelines with the dynamodb
source they will share the same static shouldStop
variable. Thus, when one pipeline shuts down, the other pipeline's DataFileLoader is effectively stopped. This may not be what the user wants.
|
||
private final AtomicInteger numOfWorkers = new AtomicInteger(0); | ||
|
||
private static final int MAX_JOB_COUNT = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be tunable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do more performance test later. The observation is that the file read is too fast but the ingestion can't catch up.
} | ||
|
||
@Override | ||
public void run() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems we have this one thread which then creates two other threads. Can we eliminate this "scheduler" thread somehow? Perhaps just run two threads that each acquire a partition and then process it.
} else { | ||
records = response.records(); | ||
} | ||
recordConverter.writeToBuffer(records); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do both the file reader thread path and stream scheduler thread path write to the buffer? What is the difference in these outputs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are converted into the same Jackson events and then sent to the same buffer and eventually same destination.
Signed-off-by: Aiden Dai <[email protected]>
It's not implemented yet (I don't know how it works). Will need a separate PR if needed. |
- `exportJobsSuccess`: measures total number of export jobs run with status completed. | ||
- `exportJobsErrors`: measures total number of export jobs cannot be submitted or run with status failed. | ||
- `exportFilesTotal`: measures total number of export files generated. | ||
- `exportFilesSuccess`: measures total number of export files read (till the last line) successfully. | ||
- `exportRecordsTotal`: measures total number of export records generated | ||
- `exportRecordsSuccess`: measures total number of export records processed successfully . | ||
- `exportRecordsErrors`: measures total number of export records processed failed | ||
- `changeEventsSucceeded`: measures total number of changed events in total processed successfully | ||
- `changeEventsFailed`: measures total number of changed events in total processed failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like these metrics mainly cover export. Are there any metrics that we should add for streams? Things like number of shards processed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are still some unresolved comments from the PR. I have left the conversations in the PR as unresolved. Let's follow up with another PR to address them.
Description
Add support of DynamoDB as source.
Issues Resolved
Resolves #2932
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.